Skip to content

Add probe-side runtime filters for inner joins in the multi-stage engine#18848

Open
yashmayya wants to merge 2 commits into
apache:masterfrom
yashmayya:inner-join-runtime-filter
Open

Add probe-side runtime filters for inner joins in the multi-stage engine#18848
yashmayya wants to merge 2 commits into
apache:masterfrom
yashmayya:inner-join-runtime-filter

Conversation

@yashmayya

@yashmayya yashmayya commented Jun 24, 2026

Copy link
Copy Markdown
Contributor

What

Adds probe-side runtime filters for equi-INNER joins in the multi-stage query engine (MSE). When
the build (right) side of a hash join is small or selective, the planner builds a reducer from its
join keys and pushes it down to the probe (left) leaf scan, so the probe table drops rows
that cannot possibly match before they are shuffled across the network into the join.

This is the INNER-join counterpart of the existing SEMI-join dynamic broadcast
(PinotJoinToDynamicBroadcastRule). It is disabled by default.

Why

For the classic fact ⋈ dim shape — a large fact table joined to a small dimension table (or a heavily
filtered build side) — the MSE today hash-shuffles the entire probe (fact) side into the join stage,
even though only the rows whose join key appears on the (tiny) build side can contribute to the result.
That wastes scan, serialization, and network bandwidth proportional to the whole fact table rather than
to the matching subset.

The SEMI-join path already solves the analogous problem by replacing the join with a leaf IN filter,
but that rewrite is only legal for semi-joins (which emit left columns only). An inner join projects
columns from both sides, so the join must still run. This PR therefore makes the filter additive:
the real hash join is left completely intact, and we only add a reducer on the probe leaf.

How it works

After exchange insertion (POST_LOGICAL), PinotJoinToInnerRuntimeFilterRule rewrites an eligible
inner join:

        [ Inner Join ]   (unchanged — still hash-shuffles both sides)
        /            \
   [xChange L]    [xChange R]
       /                \
 [RuntimeFilter]    [build subtree]
    /        \
[probe leaf] [PIPELINE_BREAKER xChange]
                   |
            [ build keys: Project(rightKeys) -> Filter(IS NOT NULL) -> limit(maxBuildRows + 1) ]
  • The join and both of its HASH exchanges are kept verbatim — execution and results are identical to
    before; the filter is purely additive.
  • A new RuntimeFilterRel / RuntimeFilterNode is grafted on top of the probe leaf subtree
    (input[0] = probe pipeline, pass-through; input[1] = a PIPELINE_BREAKER mailbox carrying the
    build-side join keys). The pipeline breaker runs the build side first and ships its keys to
    the probe-leaf worker, reusing the same mechanism as the SEMI dynamic broadcast.
  • At the probe leaf, ServerPlanRequestUtils.attachRuntimeFilter ANDs a tiered, no-false-negative
    reducer onto the V1 leaf query:
    • Exact IN for small key sets (at/below a build-key-row threshold), or for multi-key /
      BIG_DECIMAL keys. This is index-accelerated and drives segment pruning.
    • Bloom filter (IN_ID_SET) above the threshold, plus a BETWEEN(min, max) range predicate for
      numeric keys to enable cheap range-based segment pruning. Bloom keeps the wire/heap footprint bounded
      for high-cardinality build sides.
  • Because the real hash join is the source of truth, the reducer can be abandoned at any point
    (empty/over-cap build, oversized bloom, unsupported subtree, mixed-version cluster) with no effect on
    results. Bloom false positives are simply re-checked and discarded by the join.

This mirrors runtime/dynamic filtering in Trino, Impala, and Spark's InjectRuntimeFilter.

When it helps

  • Large fact table joined to a small or selectively-filtered dimension/build side.
  • The probe side is a leaf scan (table scan, optionally with single-input Project/Filter), so the
    filter can be pushed all the way down to segment scan.

It is not beneficial (and is best left off) when the build side is large or non-selective, or the
probe is cheap — there is no automatic selectivity-based gate yet, so enablement is opt-in (see below).

How to use

Per-join hint (selects the reducer mode):

SELECT /*+ joinOptions(runtime_filter='auto') */ ...
FROM fact JOIN dim ON fact.key = dim.key
WHERE dim.attr = 'x'

runtime_filter accepts off | in | bloom | auto (exact IN below the threshold, else bloom).

Cluster-wide default (enable/disable only; defaults to auto when on):

pinot.broker.enable.runtime.filter.join=true

Per-query override: SET runtimeFilterJoin='on' (or off).

Thresholds & defaults

There is one user-facing switch (the enable flag / query option / hint). Every sizing threshold below is
a fixed constant in this first version — each affects only the filter's selectivity and size, never
correctness (the real hash join re-checks every surviving row), so they are intentionally not
cluster-configurable yet.

Knob Default Evaluated at Role
pinot.broker.enable.runtime.filter.join false broker config Cluster-wide enable. Overridable per query by runtimeFilterJoin and per join by the runtime_filter hint.
runtimeFilterJoin (query option) unset per query on/off enable switch; when on, defaults to the auto tier.
runtime_filter (join hint) unset per join off / in / bloom / auto — selects the reducer mode for that join.
max IN size 10000 leaf (runtime) auto emits an exact IN at/below this many build-key rows, and a bloom above it.
max build rows 1048576 (2^20) planner + leaf The build-key stage is capped at maxBuildRows + 1 rows. If the cap is hit the key set may be truncated/incomplete, so the leaf abandons the filter (results stay correct). This also bounds the pipeline-breaker memory. The planner cap and the leaf abandon read the same constant, so they cannot diverge.
bloom FPP 0.01 leaf (runtime) Target false-positive probability for the bloom tier. False positives only admit a few extra probe rows, which the join then discards.
bloom max bytes 16 MB leaf (runtime) If the serialized bloom would exceed this, the filter is abandoned (no predicate emitted).

Notes:

  • max IN size, bloom FPP, and bloom max bytes are applied on the probe-leaf server once the build
    keys are materialized; max build rows is enforced both as the planner's fetch cap on the build-key
    stage and as the leaf's truncation guard.
  • The build keys are not de-duplicated (no DISTINCT), so max IN size compares against the
    build-key row count, not the distinct-value count — the leaf IN/bloom dedups implicitly.
  • Exceeding max build rows, an oversized bloom, an empty/all-null build, or an unsupported probe shape
    all simply drop the filter — never a wrong result.

Correctness & safety

  • No false negatives. Exact IN is exact; a bloom never reports present-as-absent and its false
    positives are discarded by the real join; the BETWEEN(min, max) bounds cover every build key.
  • Null keys are excluded (they cannot match an inner equi-join) both at the planner (IS NOT NULL)
    and defensively at the leaf.
  • NaN float/double build keys keep the bloom membership but skip the range predicate (a finite
    range would wrongly drop probe NaN rows).
  • Truncation-safe. The build-key stage is capped at maxBuildRows + 1; if the cap is hit the key set
    is incomplete, so the filter is abandoned (the planner cap and the leaf abandon use the same constant).
  • Mixed-version. The only wire change is the new RuntimeFilterNode proto variant; the default-off
    flag is the guard. Enabling the flag (or using the hint) mid-rolling-upgrade can fail queries on
    not-yet-upgraded servers — documented on the config constant.
  • The reducer is built and applied entirely at the planner and the leaf-stage query, so it requires no
    changes to join execution itself.

Testing

  • PinotJoinToInnerRuntimeFilterRuleTest — rule firing/plan shape, probe-key/build-key value alignment,
    multi-key, hint/flag/query-option gating, pipeline-breaker distribution, negative cases.
  • ServerPlanRequestUtilsTest — exact-IN, bloom + range-prune, AUTO tiering, multi-key, empty/all-null
    build, null-key skip, NaN range omission, maxBytes/maxBuildRows abandon, existing-filter merge.
  • PlanNodeDeserializerTest — mixed-version graceful failure on the new proto variant.
  • RuntimeFilterJoinIntegrationTest — end-to-end cluster self-joins asserting results are identical
    with the filter on (in/bloom/auto) and off, across INT/LONG/DOUBLE/STRING/mixed-type/null/multi-key/
    empty-build cases.
  • Full pinot-query-planner (1321) and pinot-query-runtime (4431) suites pass — no regressions.

Limitations / future work

  • No automatic, statistics-driven enablement yet (opt-in via hint/flag); a future change can gate it on
    cardinality/selectivity estimates.
  • The build side is materialized a second time for the key broadcast; a shared spool would avoid this.
  • Bloom is single-key (composite-key tuple-encoding deferred); partitioned (both-sides-hash) joins use
    the broadcast path. Only the logical (HEP) planner is wired; wiring it into the v2 MSE physical
    optimizer is a follow-up.

@yashmayya yashmayya added multi-stage Related to the multi-stage query engine feature New functionality labels Jun 24, 2026
@codecov-commenter

codecov-commenter commented Jun 24, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 62.56281% with 149 lines in your changes missing coverage. Please review.
✅ Project coverage is 64.80%. Comparing base (dc4e957) to head (ac9542e).
⚠️ Report is 6 commits behind head on master.

Files with missing lines Patch % Lines
...ry/runtime/plan/server/ServerPlanRequestUtils.java 71.62% 38 Missing and 4 partials ⚠️
.../runtime/plan/server/ServerPlanRequestVisitor.java 0.00% 18 Missing ⚠️
...he/pinot/query/planner/explain/PlanNodeMerger.java 0.00% 13 Missing ⚠️
...e/pinot/query/runtime/InStageStatsTreeBuilder.java 0.00% 12 Missing ⚠️
...e/rel/rules/PinotJoinToInnerRuntimeFilterRule.java 85.07% 5 Missing and 5 partials ⚠️
...inot/query/planner/plannode/RuntimeFilterNode.java 54.54% 6 Missing and 4 partials ⚠️
.../query/planner/logical/EquivalentStagesFinder.java 0.00% 7 Missing ⚠️
.../query/planner/logical/PlanNodeToRelConverter.java 0.00% 6 Missing ⚠️
...ry/planner/explain/PhysicalExplainPlanVisitor.java 0.00% 4 Missing ⚠️
...inot/query/planner/serde/PlanNodeDeserializer.java 60.00% 3 Missing and 1 partial ⚠️
... and 12 more
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18848      +/-   ##
============================================
- Coverage     64.81%   64.80%   -0.01%     
  Complexity     1322     1322              
============================================
  Files          3393     3396       +3     
  Lines        211246   211704     +458     
  Branches      33208    33300      +92     
============================================
+ Hits         136917   137195     +278     
- Misses        63284    63417     +133     
- Partials      11045    11092      +47     
Flag Coverage Δ
custom-integration1 ?
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 ?
java-21 64.80% <62.56%> (-0.01%) ⬇️
temurin 64.80% <62.56%> (-0.01%) ⬇️
unittests 64.80% <62.56%> (-0.01%) ⬇️
unittests1 57.01% <62.87%> (-0.01%) ⬇️
unittests2 37.12% <5.27%> (-0.05%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@yashmayya yashmayya force-pushed the inner-join-runtime-filter branch 3 times, most recently from 248d799 to 89fb53c Compare June 24, 2026 23:03
@gortiz

gortiz commented Jun 25, 2026

Copy link
Copy Markdown
Contributor

Really nice piece of work — the additive design is clean, the no-false-negative reasoning is carefully laid out, and the correctness coverage (planner/leaf cap sharing a constant, null-key exclusion on both sides, NaN range omission, graceful mixed-version proto fallback, visitor completeness enforced by the non-default interface method) is thorough. A few things I'd like to discuss before merge, all around sizing/footprint and validation rather than correctness:

1. The exact-IN tier has no footprint cap. maxBytes (16 MB) only guards the bloom path, and maxInSize only chooses bloom-vs-IN for single-key AUTO. For runtime_filter='in', and for any multi-key join (including AUTO multi-key above threshold), we fall to attachDynamicFilter and emit an exact IN of up to maxBuildRows (≈1M) literals per key, AND'd across keys — a multi-MB filter expression embedded in the leaf PinotQuery with no abandon guard. Could we have the exact-IN path honor the same maxBytes abandon as bloom, so there's one consistent footprint ceiling regardless of tier? That keeps the "no new config in v1" stance while closing the gap.

2. Bloom sizing knobs aren't tunable. MAX_IN_SIZE, MAX_BUILD_ROWS, FPP, and MAX_BYTES are fixed constants with no CONFIG_OF_* keys — the only runtime-reachable attachRuntimeFilter hardcodes the defaults (the parameterized overload is test-only). I understand the intent (they affect selectivity/size, not correctness) and I'm fine deferring real config to a follow-up. Just flagging it so it's a conscious decision, and tying back to (1): MAX_BYTES is the one knob that actually bounds footprint, and right now it doesn't apply to the tier that can get largest.

3. No benchmark. This is fundamentally a perf optimization (the motivation is all scan/serialization/network savings), but there's no JMH or cluster measurement — all tests assert result parity, not speedup. Since there's no selectivity gate yet and the feature can regress when the build side is large or the probe is cheap, a benchmark on the canonical large-fact ⋈ small-dim shape — showing the win at high selectivity and the cost at low selectivity / large build — would both justify the feature and tell us how real the hazard in (1) is. pinot-perf already has MSE/join scaffolding to build on. Reasonable to defer for an opt-in v1, but worth at least one data point.

4. Minor / please confirm: NaN handling is correct for the bloom (membership kept, range dropped), but the exact-IN tier just emits IN(probeCol, …, NaN, …) via the reused computeInOperands. Since float/double keys are now first-class here, can you confirm the leaf IN predicate and the MSE hash join agree on NaN = NaN (i.e. either both match it or both drop it)? If they disagree, exact-IN could drop a probe NaN row the join would keep. Likely consistent with the existing SEMI path, but worth a sentence given how carefully NaN is handled elsewhere.

None of these block correctness — the join remaining the source of truth makes the feature safe. (1) and (4) are the ones I'd most like resolved.

When the build side of an equi-inner-join is small/selective, build a reducer (exact IN below a
threshold, else a bloom filter plus a min/max range predicate) from its distinct join keys and push
it down to the probe-side leaf scan via a pipeline-breaker edge, so the probe (fact) table drops
non-matching rows before they are shuffled into the join. The real hash join still runs and remains
the source of truth, so the filter is a no-false-negative optimization that can be omitted at any time.

Generalizes the existing SEMI dynamic-broadcast machinery to inner joins (the join is kept and the
filter is additive). Disabled by default; enabled via the runtime_filter join hint or the
pinot.broker.enable.runtime.filter.join cluster flag / runtimeFilterJoin query option.
@yashmayya yashmayya force-pushed the inner-join-runtime-filter branch from 89fb53c to 278ee24 Compare June 25, 2026 21:11
@yashmayya

Copy link
Copy Markdown
Contributor Author

Thanks for the thorough review, @gortiz — these are all fair. Addressed below; the branch is updated.

1. Exact-IN footprint cap. Fixed. attachRuntimeFilter now estimates the exact-IN serialized footprint (estimateExactInBytes, one IN list per key over the build rows) and abandons the filter when it would exceed the same maxBytes ceiling the bloom tier honors — so there is now one consistent footprint ceiling regardless of tier (exact IN mode, multi-key, or AUTO). The SEMI-shared attachDynamicFilter is deliberately left untouched: a SEMI join replaces the join with the leaf IN, so it must always emit it. Abandoning is correctness-neutral here (the real hash join still runs). Unit tests cover the abandon-above and keep-below behavior for every storable key type (INT/LONG/FLOAT/DOUBLE/STRING/BYTES/BIG_DECIMAL), the strict footprint boundary (== maxBytes keeps, one byte over abandons), and the multi-key case.

2. Sizing knobs not tunable. Kept as fixed constants for v1, as a conscious decision (and thanks for the explicit nudge). They only affect selectivity/footprint, never correctness, so I'd rather not ship CONFIG_OF_*/query-option surface that isn't wired end-to-end (the leaf would need them threaded through RuntimeFilterNode); a follow-up can add real config + thread it. Tying back to (1): maxBytes now bounds every tier, so the one knob that governs footprint applies uniformly.

3. Benchmark. Added BenchmarkRuntimeFilterJoin in pinot-perf: a 1M-row, unique-key (1:1) self-join that projects a realistically wide probe row (intCol, longCol, doubleCol, strCol, str2Col, str3Col) so the probe-side shuffle — the thing the filter reduces — dominates, which is the stated motivation (a wide fact table shuffled needlessly). A WHERE t2.intCol % buildMod = 0 controls build selectivity; I compare filter=off (baseline, no hint) vs filter=auto (hint-enabled; cluster default stays off) on INT and STRING join keys. buildMod=10000 (~0.01% build → AUTO picks exact IN) is the high-selectivity case where the reducer can shrink the probe; buildMod=1 (100% build → AUTO picks bloom) is the no-reduction case the feature must be left off for. countJoinInt (only the key crosses the network) is a reference for the overhead floor. Numbers:

Workload (1M-row self-join, wide probe projection) Build side off auto Effect
SELECT wide, STRING key 100 rows (0.01%), exact IN 139.1 ± 16.0 8.7 ± 1.4 15.9× faster
SELECT wide, INT key 100 rows (0.01%), exact IN 7.8 ± 2.3 8.0 ± 2.1 break-even (within noise)
SELECT wide, STRING key 1M rows (100%), bloom 1259 ± 86 1366 ± 112 1.08× slower
SELECT wide, INT key 1M rows (100%), bloom 1169 ± 93 1292 ± 106 1.10× slower
COUNT(*), INT key (reference) 100 rows (0.01%) 7.8 ± 2.8 8.6 ± 2.6 1.10× slower
COUNT(*), INT key (reference) 1M rows (100%) 75.5 ± 10.3 210.5 ± 57.8 2.79× slower

ms/op, AverageTime, 5×5 iterations, 1 fork, 2 servers, JDK 21; ± is the 99.9% CI.

Takeaways:

  • Big, real win where the probe is genuinely expensive to ship through the join — the wide-row STRING-key join with a 0.01% build: 15.9× (139 → 8.7 ms). This is the canonical "fact table shuffled needlessly" case: the reducer cuts the probe to the ~100 matching keys before the hash exchange.
  • Break-even where the probe is already cheap — the INT-key join at 1M rows: 7.8 → 8.0 ms, within the measurement noise. The baseline is already at the COUNT floor (the INT probe is cheap to hash/shuffle at this scale), so there's little probe-side cost for the reducer to remove and its overhead just cancels out. The benefit grows with probe scale and per-row key cost; it does not regress here.
  • Cost when nothing can be reduced — the 100% build (buildMod=1): 1.08–1.10× slower on the wide selects, and 2.79× on the bare COUNT (where broadcasting/applying the 1M-key bloom dominates an otherwise ~75 ms query and the probe can't shrink because every key matches). This is exactly the hazard you flagged in (1), and why the feature is opt-in and off by default.

The plan is additive — EXPLAIN for the hinted INT case (trimmed):

LogicalJoin(condition=[=($2, $6)], joinType=[inner])
  PinotLogicalExchange(distribution=[hash[2]])
    RuntimeFilterRel(probeKeys=[[2]], buildKeys=[[0]], filterType=[AUTO])
      LogicalProject(...) / PinotLogicalTableScan(...)                 -- probe leaf
      PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])
        LogicalSort(fetch=[1048577])                                   -- maxBuildRows + 1 truncation guard
          LogicalProject(id) / LogicalFilter(=cat,'rare') / TableScan  -- build keys
  PinotLogicalExchange(distribution=[hash[0]])                         -- real hash join, unchanged
    LogicalProject(id) / LogicalFilter(=cat,'rare') / TableScan

The reducer sits on the probe leaf, fed by a pipeline-breaker broadcast of the (capped) build keys; the hash join below is untouched, so it remains the source of truth and the reduction — not a plan change — is what drives the win.

4. NaN in exact-IN. Confirmed they agree. Added testNaNKeyMatchesBaseline: a self-join on a DOUBLE column where ids 0–2 are NaN. The MSE hash join matches NaN = NaN (baseline COUNT(*) = 9: 3 NaN probe × 3 NaN build), and the exact-IN, bloom, and AUTO reducers all reproduce exactly that — so the leaf IN predicate and the join agree on NaN, no probe-NaN row is dropped. (Consistent with the existing SEMI path, now pinned by a test.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature New functionality multi-stage Related to the multi-stage query engine

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants